from pyspark.sql import SparkSession #this is the module that connects to apache spark
from pyspark.sql import SQLContext
spark = (SparkSession.builder.appName("csp1_jes").config("hive.metastore.uris", "thrift://ip-10-1-2-24.ap-south-1.compute.internal:9083").enableHiveSupport().getOrCreate())
#this creates a Spark session that allows us to connect to spark and configure the program
spark
- the database and table structure
#The database for my porject is named 'csp1_jes'
spark.sql("use csp1_jes")
#List of all tables in my database
spark.sql("show tables").show()
#The desc shows the description of the schema of the table
#Titles - Job titles of employees
spark.sql("desc titles_jes").show()
spark.sql("Select * from titles_jes").show()
#Departments - The departments in the organisation
spark.sql("desc departments_jes").show()
spark.sql("Select * from departments_jes").show()
#Department Employees - which department each employee belongs to
spark.sql("desc dept_emp_jes").show()
spark.sql("Select * from dept_emp_jes limit 20").show()
#Department Managers - Managers of each department in the organisation
spark.sql("desc dept_manager_jes").show()
spark.sql("select * from dept_manager_jes limit 20").show()
#Employees - Personal employee information
spark.sql("desc employees_jes").show()
spark.sql("select * from employees_jes limit 20").show()
#Salaries - salaries of each employee
spark.sql("desc salaries_jes").show()
spark.sql("select * from salaries_jes limit 20").show()
spark.sql("select e.emp_no, e.last_name, e.first_name, e.sex, s.salary from employees_jes e JOIN salaries_jes s on e.emp_no = s.emp_no").show()
spark.sql("select first_name, last_name, hire_date from employees_jes where hire_date between '1/1/1986' and '12/31/1986' order by hire_date").show()
spark.sql("select d.dept_no, d.dept_name, dm.emp_no, e.last_name, e.first_name from departments_jes d join dept_manager_jes dm on d.dept_no = dm.dept_no join employees_jes e on dm.emp_no = e.emp_no").show()
spark.sql("select de.emp_no, e.last_name, e.first_name, d.dept_name from dept_emp_jes de join employees_jes e on de.emp_no = e.emp_no join departments_jes d on de.dept_no = d.dept_no").show()
spark.sql("select first_name, last_name, sex from employees_jes where first_name = 'Hercules' and last_name Like 'B%'").show()
spark.sql("select e.emp_no, e.last_name, e.first_name, d.dept_name from employees_jes e join dept_emp_jes de \
on de.emp_no = e.emp_no join departments_jes d on de.dept_no = d.dept_no where d.dept_no like '%d007%'").show()
spark.sql("select e.emp_no, e.last_name, e.first_name, d.dept_name from employees_jes e join dept_emp_jes de \
on de.emp_no = e.emp_no join departments_jes d on de.dept_no = d.dept_no where d.dept_no like '%d007%' or d.dept_no like '%d005%'").show()
spark.sql("select last_name, COUNT(last_name) as lm_frequency from employees_jes group by last_name order by lm_frequency DESC").show()
import numpy as np #Basically a fast linear algebra library
import seaborn as sns #Data visualisation based on matplotlib
import matplotlib.pyplot as plt #Data visualisation
import pandas as pd #To convert hive data into a pandas dataframe for data visualization
#Creating a pandas dataframe with salary column
emp_salaries = spark.sql("select * from salaries_jes")
emp_salaries = emp_salaries.toPandas()
emp_salaries.head(20)
#Statistical overview of the salaries
emp_salaries['salary'].describe()
#pip install -U seaborn
plt.figure(figsize = (15, 10))
sns.histplot(data = emp_salaries, x = 'salary', bins = 10, kde = True, color = '#FB4E0B')
plt.title('Employee Salary Distribution', fontsize = 18)
plt.xlabel('Employee Salary', fontsize = 16)
plt.ylabel('Employee count', fontsize = 16)
#Creating pandas dataframe for titles
emp_titles = spark.sql("select * from titles_jes")
emp_titles = emp_titles.toPandas()
emp_titles.head()
#Creating pandas dataframe for employees
emp = spark.sql("select * from employees_jes")
emp = emp.toPandas()
emp.head()
dept_emp = spark.sql("select * from dept_emp_jes")
dept_emp = dept_emp.toPandas()
dept_emp.head()
#Rename columns to merge tables
emp_new = emp_titles.rename(columns={'title_id': 'emp_title_id'})
emp_new.head()
#Merge salaries and employees tables
emp_comb = pd.merge(emp_salaries, emp, on="emp_no", how="inner")
emp_comb.head()
emp_comb2 = pd.merge(emp_comb, emp_new, on="emp_title_id", how="inner")
emp_comb2.head()
emp_sal_grouped = emp_comb2.groupby("title").mean()
emp_sal_grouped
#In emp_sal_grouped the index is the title therefor it needs to be changed to use for plotting later on
emp_sal_grouped2 = emp_sal_grouped.reset_index()
emp_sal_grouped2
emp_sal_grouped2 = emp_sal_grouped2.sort_values('salary')
plt.figure(figsize = (15, 10))
sns.barplot(x = 'salary', y = 'title', data = emp_sal_grouped2, palette = 'rocket_r')
plt.title('Employee Salary Distribution', fontsize = 18)
plt.xlabel('Employee Salary', fontsize = 16)
plt.ylabel('Employee Job Title', fontsize = 16)
- It is apparent that senior staff earns the highest salary followed by Managers and lastly other staff
#Making one dataframe from all separate data files
emp_full = pd.merge(dept_emp, emp_comb2, on="emp_no", how="inner")
emp_full.head()
import plotly.express as px
plt.figure(figsize = (15, 10))
plot1 = px.histogram(emp_full, x = 'salary', color= "sex", nbins = 10, title='Salary distribution by sex',
labels={'salary':'Employee Salary'})
plot1.show()
plt.figure(figsize = (15, 10))
plot2 = px.bar(emp_sal_grouped2, x = 'title', y = 'emp_no', color = 'salary')
plot2.show()
#Making a copy so that I can use it if there are any errors in future code
emp_full2 = emp_full.copy()
#Dataframe information
emp_full2.info()
#Change the column types accordingly for further use
emp_full2['hire_date'] = pd.to_datetime(emp_full2['hire_date'])
emp_full2['birth_date'] = pd.to_datetime(emp_full2['birth_date'])
#The left_org column is the column that gives information about when an employee left the job post.
#It has not been stored in a proper format hence I need to transform it.
emp_full2['last_date']
emp_full2['last_date'] = emp_full2['last_date'].replace(to_replace = '\r', value = '', regex = True)
emp_full2['last_date']
emp_full2['last_date'] = pd.to_datetime(emp_full2['last_date'])
emp_full2.info()
There are total of 7 categorical variables.
- dept_no
- emp_title_id
- first_name
- last_name
- sex
- last_performance_rating
- title
But not all are important for ML model, hence I will only transform those that are important which are emp_title_no/title, dept_no, sex, last_performace_rating. Here the dimension of columns is less and easily manageable hence I will encode thhe columns manually instead of using encoding functions and increasing the dimensions
emp_full2['sex'] = np.where(emp_full2['sex'] == 'F', 0, 1)
emp_full2['last_performance_rating'].value_counts()
The last performance rating is a ordinal variable where the ratings are in the order of the highest rating being S, A, B, C and the least rating being PIP. While encoding ordinal variable the usual convention is to give highest value to the highest ordinal value. Hence I will give the ratings are follows:
- S -> 4
- A -> 3
- B -> 2
- C -> 1
- PIP -> 0
emp_full2['last_performance_rating'] = emp_full2['last_performance_rating'].replace(to_replace = 'S', value = 4)
emp_full2['last_performance_rating'] = emp_full2['last_performance_rating'].replace(to_replace = 'A', value = 3)
emp_full2['last_performance_rating'] = emp_full2['last_performance_rating'].replace(to_replace = 'B', value = 2)
emp_full2['last_performance_rating'] = emp_full2['last_performance_rating'].replace(to_replace = 'C', value = 1)
emp_full2['last_performance_rating'] = emp_full2['last_performance_rating'].replace(to_replace = 'PIP', value = 0)
emp_full2['last_performance_rating'].value_counts()
emp_full2['emp_title_id'].value_counts()
Employee title id doesnt have any order hence, it doesn't matter what numbers are used to encode.
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 's0001', value = 0)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 's0002', value = 1)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 'e0001', value = 2)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 'e0002', value = 3)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 'e0003', value = 4)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 'e0004', value = 5)
emp_full2['emp_title_id'] = emp_full2['emp_title_id'].replace(to_replace = 'm0001', value = 6)
emp_full2['emp_title_id'].value_counts()
emp_full2['dept_no'].value_counts()
There are a total of nine departments and they are not ordinal. Hence it does not matter what the numbers are used to encode.
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd001', value = 0)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd002', value = 1)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd003', value = 2)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd004', value = 3)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd005', value = 4)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd006', value = 5)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd007', value = 6)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd008', value = 7)
emp_full2['dept_no'] = emp_full2['dept_no'].replace(to_replace = 'd009', value = 8)
emp_full2['dept_no'].value_counts()
emp_full2.head()
plt.figure(figsize = (15, 10))
null_plot = sns.heatmap(emp_full2.isnull(), cbar = False)
plt.title('Null value distribution', fontsize = 18)
plt.xlabel('Variables', fontsize = 16)
plt.ylabel('Count', fontsize = 16)
emp_full2.isna().sum()
- There are no null values in any variables except 'last_date' which means only a few employees have left the organisation
emp_full2.columns
#Correlation plot to find out between variables in the data
corr_features = ['dept_no', 'salary', 'emp_title_id', 'birth_date', 'sex', 'hire_date', 'no_of_projects',
'last_performance_rating', 'left_org', 'last_date']
corr1 = emp_full2[corr_features]
corr1.head()
plt.figure(figsize = (25, 20))
corr_plot = sns.heatmap(corr1.corr(), annot = True, fmt = ".2%", cmap = 'Oranges', annot_kws = {'size': 8})
plt.title('Correlation betweena all variables')
plt.show()
- There doesn't seem to be much correlation between the variables therefore I will keep all of them for the model building
emp_spark = spark.createDataFrame(emp_full2)
emp_spark.printSchema()
emp_spark.show()
#Creating a dataframe with the relevant variables
emp_mfeatures = emp_spark.select('dept_no', 'salary', 'emp_title_id', 'birth_date', 'sex', 'hire_date', 'no_of_projects',
'last_performance_rating', 'last_date', 'left_org')
emp_mfeatures.printSchema()
#converting the datatypes to spark datatypes
from pyspark.sql.types import StringType, DateType, IntegerType
emp_mfeatures = emp_mfeatures.withColumn("birth_date",
emp_mfeatures["birth_date"]
.cast(IntegerType()))
emp_mfeatures = emp_mfeatures.withColumn("hire_date",
emp_mfeatures["hire_date"]
.cast(IntegerType()))
emp_mfeatures = emp_mfeatures.withColumn("last_date",
emp_mfeatures["last_date"]
.cast(IntegerType()))
emp_mfeatures = emp_mfeatures \
.withColumn("dept_no" ,
emp_mfeatures["dept_no"]
.cast(IntegerType())) \
.withColumn("salary",
emp_mfeatures["salary"]
.cast(IntegerType())) \
.withColumn("emp_title_id" ,
emp_mfeatures["emp_title_id"]
.cast(IntegerType())) \
.withColumn("sex" ,
emp_mfeatures["sex"]
.cast(IntegerType())) \
.withColumn("no_of_projects",
emp_mfeatures["no_of_projects"]
.cast(IntegerType())) \
.withColumn("last_performance_rating",
emp_mfeatures["last_performance_rating"]
.cast(IntegerType())) \
.withColumn("left_org",
emp_mfeatures["left_org"]
.cast(IntegerType())) \
emp_mfeatures.printSchema()
features = emp_mfeatures.columns
emp_list = emp_mfeatures.columns[0:11]
#using the assembler to create a single vector column from a list of columns
from pyspark.ml.feature import VectorAssembler
assemblerInputs = emp_list
vecAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")
#Definig the model
from pyspark.ml.classification import LogisticRegression
emp_lr = LogisticRegression(featuresCol = "features", labelCol = "left_org", regParam = 1.0)
#Building a pipeline to create the model
from pyspark.ml import Pipeline
#Defining the pipeline based on the stages created in previous steps
emp_pipeline = Pipeline(stages = [vecAssembler, emp_lr])